package com.it.cloud.producer.partition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

/**
 * 自定义Partitioner
 */
public class MyPartitionor implements Partitioner {

    /**
     * 分区数
     */
    private static final Integer PARTITIONS = 5;

    /**
     * 根据ID与分区总长度取模来做为分区分配的策略
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        if (null == key) {
            return 0;
        }
        String id = String.valueOf(key);
        try {
            int partitionId = Integer.valueOf(id) % PARTITIONS;
            return partitionId;
        } catch (NumberFormatException e) {
            System.out.println("Parse message key exception:" + e);
            return 0;
        }
    }

    @Override
    public void configure(Map<String, ?> arg0) {
    }

    @Override
    public void close() {
    }

}
